package com.offcn.bigdata.spark.p3

import org.apache.spark.{SparkConf, SparkContext}

/*
    累加器
        其作用就相当于计数器
    累加器使用时需要注意的亮点问题：
        累加器的结果调用只能在action触发之后
        要避免进行累加的rdd被重复使用，造成重复累加，或者在调用完累加器之后，立即重置累加器的值，避免重复累加
 */
object _05AccumulatorOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName(s"${_05AccumulatorOps.getClass.getSimpleName}")
            .setMaster("local[*]")

        val sc = new SparkContext(conf)

        val list = sc.parallelize(List(
            "a is abstraction in spark is",
            "shared spark that abstraction be is in spark abstraction"
        ))
        var sparkAccu = sc.longAccumulator("sparkAccu")
        var pairs = list.flatMap(_.split("\\s+")).map(word => {
            if(word == "spark") {
                sparkAccu.add(1L)
            }
            (word, 1)
        })

        val ret = pairs.reduceByKey(_+_)

        println("===action触发前==sparkCount: " + sparkAccu.value)
        ret.foreach(println)

        println("===action触发后==sparkCount: " + sparkAccu.value)
        sparkAccu.reset()//重置累加器的值
        println("======含有累加器的rdd被重复使用======")
        pairs.count()
        println("===action触发后==sparkCount: " + sparkAccu.value)

        Thread.sleep(1000000)
        sc.stop()
    }
}
